Debizium PostgreSQL
Общее описание
Блок Debizium PostgreSQL подключается к PostgreSQL и отслеживает изменения в реальном времени через логическую репликацию. Преобразует изменения (INSERT, UPDATE, DELETE) в поток событий для дальнейшей обработки.
Блок Debizium PostgreSQL может использоваться в качестве:
- блока – получателя.
Настройка блока
В данном разделе описаны параметры блока Debizium PostgreSQL, которые необходимо заполнить при его настройке.
Базовые параметры
| Название в UI | Название атрибута | Описание | Значение по умолчанию | Тип данных |
|---|---|---|---|---|
| Name | name | Обязательный параметр Уникальное имя экземпляра коннектора Debezium для PostgreSQL. Используется системой Bercut ESB для его идентификации и регистрации. Попытка зарегистрировать коннектор с уже существующим именем приведет к ошибке. | Строка | |
| Connection | connection | Позволяет выбрать предсозданное подключение к серверу или создать новое с помощью визарда Connection Manager. | Список | |
| Description | description | Краткое описание блока. | Строка | |
| Additional Properties | additionalProperties | Позволяет задать свойства для компонента Debezium, которые нельзя установить напрямую в конфигурации системы. | Строка | |
| Internal Key Converter | internalKeyConverter | Класс конвертера для сериализации и десериализации ключевых данных смещений (offsets). Определяет формат хранения ключей в системе. | org.apache.kafka.connect.json.JsonConverter | Строка |
| Internal Value Converter | internalValueConverter | Класс‑конвертер для сериализации и десериализации данных смещений (offsets). Определяет формат хранения служебной информации о позициях чтения в источнике данных. | org.apache.kafka.connect.json.JsonConverter | Строка |
| Offset Commit Policy | offsetCommitPolicyClass | Класс Java, определяющий условия коммита офсетов (на основе количества обработанных событий и времени с последнего коммита). Должен реализовывать интерфейс OffsetCommitPolicy. | Строка | |
| Offset Commit Timeout Ms | offsetCommitTimeout | Максимальное время (в мс) ожидания фиксации записей и смещений разделов в хранилище смещений перед отменой операции и повторной попыткой в будущем | 5000 | Целое число |
| Offset Flush Interval Ms | offsetCommitInterval | Интервал попытки фиксации смещений (офсетов) | 60000 | Целое число |
| Offset Storage | offsetStorage | Класс, отвечающий за хранение смещений (offsets) коннектора. | org.apache.kafka.connect.storage.FileOffsetBackingStore | Строка |
| Offset Storage File Name | offsetStorageFileName | Путь к файлу, в котором хранятся смещения (offsets), если используется файловое хранилище. | Строка | |
| Offset Storage Partitions | offsetStoragePartitions | Количество партиций для топика смещений, если используется KafkaOffsetBackingStore. | Целое число | |
| Offset Storage Replication Factor | offsetStorageReplicationFactor | Фактор репликации для топика смещений. | Целое число | |
| Offset Storage Topic | offsetStorageTopic | Имя топика, используемого для хранения смещений. | Строка | |
| Binary Handling Mode | binaryHandlingMode | Способ обработки бинарных данных (bytes — как массив байтов). | bytes | Строка |
| Column Exclude List | columnExcludeList | Список столбцов (по шаблону), которые следует исключить из отслеживания изменений. | Строка | |
| Column Include List | columnIncludeList | Список столбцов (по шаблону), которые должны отслеживаться на изменения. | Строка | |
| Column Propagate Source Type | columnPropagateSourceType | Указывает, нужно ли передавать исходный тип данных столбца в схеме сообщения. | Строка | |
| Converters | converters | Опциональный список пользовательских конвертеров для преобразования данных (вместо стандартных). | Строка | |
| Custom Metric Tags | customMetricTags | Пользовательские теги (ключ‑значение) для настройки имени MBean‑объекта (метрики). | Строка | |
| Database Dbname | databaseDbname | Имя базы данных PostgreSQL, из которой фиксируются изменения. | Строка | |
| Database Hostname | databaseHostname | Хост (адрес) сервера PostgreSQL. | Строка | |
| Database Initial Statements | databaseInitialStatements | SQL‑запросы, выполняемые при установлении соединения с БД. | Строка | |
| Database Password | databasePassword | Обязательный параметр Пароль пользователя для подключения к PostgreSQL. | Строка | |
| Database Port | databasePort | Порт сервера PostgreSQL. | 5432 | Целое число |
| Database Query Timeout Ms | databaseQueryTimeoutMs | Тайм‑аут (в мс) для SQL‑запросов к БД. Может задаваться в формате времени (например, 10m). | 10m | Строка |
| Database Sslcert | databaseSslcert | Путь к файлу сертификата клиента для SSL‑подключения. | Строка | |
| Database Sslfactory | databaseSslfactory | Имя класса, создающего SSL‑сокеты при подключении к PostgreSQL через JDBC. | Строка | |
| Database Sslkey | databaseSslkey | Путь к файлу приватного ключа клиента для SSL. | Строка | |
| Database Sslmode | databaseSslmode | Режим SSL‑подключения. Возможные значения:
| prefer | Строка |
| Database Sslpassword | databaseSslpassword | Пароль для доступа к приватному ключу клиента (SSL). | Строка | |
| Database Sslrootcert | databaseSslrootcert | Файл с корневым сертификатом для проверки сервера при SSL. | Строка | |
| Database Tcpkeepalive | databaseTcpkeepalive | Включение/выключение TCP keep‑alive для предотвращения разрыва соединения. | True | Логическое значение |
| Database User | databaseUser | Имя пользователя для подключения к серверу PostgreSQL. | Строка | |
| Datatype Propagate Source Type | datatypePropagateSourceType | Передача исходного типа и длины данных БД в схеме сообщения (для кастомных типов). | Строка | |
| Decimal Handling Mode | decimalHandlingMode | Способ обработки десятичных чисел. Возможные значения:
| precise | Строка |
| Errors Max Retries | errorsMaxRetries | Максимальное число попыток повторной обработки ошибки ( "-1" — бесконечно). | -1 | Целое число |
| Event Processing Failure Handling Mode | eventProcessingFailureHandlingMode | Стратегия обработки ошибок при обработке событий коннектором. Возможные значения:
| fail | Строка |
| Flush Lsn Source | flushLsnSource | Указывает источник для получения позиции LSN (Log Sequence Number), используемой для синхронизации данных при репликации. В контексте Bercut ESB определяет, откуда коннектор берет текущую позицию WAL для корректного продолжения репликации после перезапуска или сбоя. | True | Логическое значение |
| Heartbeat Action Query | heartbeatActionQuery | SQL‑запрос, выполняемый при отправке сигнала «сердцебиения» (heartbeat) для поддержания активности соединения с СУБД и подтверждения работоспособности коннектора. | Строка | |
| Heartbeat Interval Ms | heartbeatIntervalMs | Определяет интервал в миллисекундах, с которым коннектор периодически отправляет сигналы «сердцебиения» (heartbeat) в специальный топик. Эти сигналы подтверждают активность коннектора, помогают отслеживать его состояние, а также предотвращают разрыв соединения из‑за бездействия. Зачение 0 отключает отправку heartbeat‑сигналов | 0ms | Строка |
| Heartbeat Topics Prefix | heartbeatTopicsPrefix | Задает префикс для имен топиков, куда отправляются сигналы «сердцебиения» (heartbeat). Позволяет изолировать служебные сообщения от основных данных. | __debezium-heartbeat | Строка |
| Hstore Handling Mode json | hstoreHandlingMode | Определяет формат преобразования данных типа Hstore (PostgreSQL) при репликации. Возможные значения:
| json | Строка |
| Include Schema Comments | includeSchemaComments | Указывает, нужно ли включать комментарии к объектам схемы (таблицам, столбцам) в метаданные событий репликации. | False | Логическое значение |
| Include Unknown Datatypes | includeUnknownDatatypes | Определяет, следует ли обрабатывать и передавать типы данных, не распознанные коннектором. | False | Логическое значение |
| Incremental Snapshot Chunk Size | incrementalSnapshotChunkSize | Задает количество строк, обрабатываемых за один чанк при инкрементном снимке данных. Позволяет регулировать нагрузку на СУБД во время репликации. | 1024 | Целое число |
| Incremental Snapshot Watermarking Strategy | incrementalSnapshotWatermarkingStrategy | Определяет стратегию установки водяных знаков (watermark) для отслеживания прогресса инкрементного снимка. Возможные значения:
| INSERT_INSERT | Строка |
| Interval Handling Mode | intervalHandlingMode | Задает способ представления интервальных типов данных в сообщениях. Возможные значения:
| numeric | Строка |
| Max Batch Size | maxBatchSize | Максимальное количество записей из источника, обрабатываемых в одном пакете за итерацию. Увеличение значения может повысить пропускную способность, но увеличит нагрузку на память и задержку обработки. Уменьшение значения снижает нагрузку, но может снизить общую производительность. | 2048 | Целое число |
| Max Queue Size | maxQueueSize | Максимальный размер очереди для событий изменений, прочитанных из журнала базы данных, но еще не записанных и не переданных дальше. Должен быть всегда больше максимального размера батча (maxBatchSize). Позволяет буферизовать данные при временных задержках в обработке или отправке. | 8192 | Целое число |
| Max Queue Size In Bytes | maxQueueSizeInBytes | Максимальный объем очереди (в байтах) для событий изменений, прочитанных из журнала базы данных, но еще не записанных и не переданных дальше. Значение 0 означает, что ограничение по объему отключено — очередь может расти без жестких лимитов по размеру (ограничения могут накладываться другими механизмами). Позволяет контролировать потребление памяти и предотвращать переполнение при пиковых нагрузках. | 0 | Целое число |
| Message Key Columns | messageKeyColumns | Список выражений (разделенных точкой с запятой), определяющих полные имена таблиц и колонок, которые будут использоваться в качестве ключа сообщения. Формат каждого выражения: DB_NAME.TABLE_NAME:COLUMN_NAME или SCHEMA_NAME.TABLE_NAME:COLUMN_NAME. | Строка | |
| Message Prefix Exclude List | messagePrefixExcludeList | Список регулярных выражений (разделенных запятой), соответствующих префиксам сообщений логического декодирования, которые исключаются из мониторинга. | Строка | |
| Message Prefix Include List | messagePrefixIncludeList | Список регулярных выражений (разделенных запятой), соответствующих префиксам сообщений логического декодирования для мониторинга. Если параметр не задан, отслеживаются все префиксы (по умолчанию). | Строка | |
| Notification Enabled Channels | notificationEnabledChannels | Список имен каналов уведомлений, которые активированы. Возможные значения зависят от реализации системы. | Строка | |
| Notification Sink Topic Name | notificationSinkTopicName | Имя топика, в который отправляются уведомления. Обязательно, если в списке включенных каналов (notificationEnabledChannels) присутствует значение sink. | Строка | |
| Plugin Name | pluginName | Имя плагина логического декодирования PostgreSQL, установленного на сервере. Определяет формат и способ передачи изменений данных из WAL (Write‑Ahead Log). Возможные значения:
| decoderbufs | Строка |
| Poll Interval Ms | pollIntervalMs | Время (в миллисекундах), которое коннектор ожидает появления новых событий изменений после того, как не получил ни одного события в предыдущем цикле опроса. Коннектор приостанавливается на указанный период перед следующим запросом к источнику данных. | 500ms | Строка |
| Post Processors | postProcessors | Опциональный список пост‑процессоров. Процессоры определяются через параметр .type , настраиваются дополнительными опциями. | Строка | |
| Provide Transaction Metadata | provideTransactionMetadata | Включает извлечение метаданных транзакции вместе с подсчетом событий. | False | Логическое значение |
| Publication Autocreate Mode | publicationAutocreateMode | Определяет поведение коннектора при отсутствии публикации для pgoutput. Возможные значения:
| all_tables | Строка |
| Publication Name | publicationName | Имя публикации PostgreSQL 10+, используемой для потоковой передачи изменений через плагин логического декодирования. Требуется при использовании pgoutput . | dbz_publication | Строка |
| Query Fetch Size | queryFetchSize | Максимальное число записей, загружаемых в память при потоковой передаче данных. Значение 0 использует размер выборки JDBC по умолчанию. | 0 | Целое число |
| Replica Identity Autoset Values | replicaIdentityAutosetValues | Применяется только при потоковой передаче изменений с использованием pgoutput. Определяет значение Replica Identity на уровне таблицы (перезаписывает существующее значение в БД). Формат: список регулярных выражений (через запятую), сопоставляемых с полными именами таблиц (SCHEMA_NAME.TABLE_NAME) и значением Replica Identity. Допустимые значения: DEFAULT, INDEX index_name, FULL, NOTHING. | Строка | |
| Retriable Restart Connector Wait Ms | retriableRestartConnectorWaitMs | Время ожидания (в миллисекундах) перед перезапуском коннектора после возникновения повторяемой ошибки (retriable exception). | 10s | Строка |
| Schema Exclude List | schemaExcludeList | Список схем, для которых события изменений не должны захватываться коннектором. Позволяет исключить ненужные данные из репликации. | Строка | |
| Schema History Internal File Filename | schemaHistoryInternalFileFilename | Путь к файлу, используемому для записи истории схемы базы данных (фиксация изменений структуры БД). | Строка | |
| Schema Name Adjustment Mode | schemaNameAdjustmentMode | Определяет способ корректировки имен схем для совместимости с конвертером сообщений. Возможные значения:
| none | Строка |
| Schema Refresh Mode | schemaRefreshMode | Задает условия, при которых происходит обновление кэша схемы в памяти. Возможные значения:
| columns_diff | Строка |
| Signal Data Collection | signalDataCollection | Имя коллекции данных, используемой для отправки сигналов/команд в Debezium. Для многораздельных коннекторов можно указать несколько коллекций через запятую. При отсутствии значения сигнализация отключена. | Строка | |
| Signal Enabled Channels | signalEnabledChannels | Список имен каналов, через которые принимаются сигналы управления. Канал source включен по умолчанию. Возможные значения:
| source | Строка |
| Signal Poll Interval Ms | signalPollIntervalMs | Интервал (в миллисекундах) для проверки новых сигналов в зарегистрированных каналах. | 5s | Строка |
| Skipped Operations | skippedOperations | Список операций (через запятую), пропускаемых во время потоковой передачи. Возможные значения:
| t | Строка |
| Slot Drop On Stop | slotDropOnStop | Определяет, следует ли удалять слот логической репликации при корректном завершении работы коннектора. Если false, слот сохраняется для возобновления репликации с последней позиции. | False | Логическое значение |
| Slot Max Retries | slotMaxRetries | Количество попыток повторного подключения к слоту репликации при сбое. | 6 | Целое число |
| Slot Name | slotName | Имя слота логической репликации PostgreSQL, создаваемого для потоковой передачи изменений. | debezium | Строка |
| Slot Retry Delay Ms | slotRetryDelayMs | Время ожидания (в миллисекундах) между попытками подключения к слоту репликации после сбоя. | 10s | Строка |
| Slot Stream Params | slotStreamParams | Дополнительные параметры, передаваемые плагину логического декодирования (через точку с запятой). Пример: add-tables=public.table,public.table2 include-lsn=true. | Строка | |
| Snapshot Delay Ms | snapshotDelayMs | Задержка (в миллисекундах) перед началом снимка данных. Позволяет отложить старт репликации для подготовки окружения. | 0ms | Строка |
| Snapshot Fetch Size | snapshotFetchSize | Максимальное число записей, загружаемых в память при выполнении снимка данных (snapshot). | Целое число | |
| Snapshot Include Collection List | snapshotIncludeCollectionList | Список таблиц/коллекций, для которых должен быть выполнен снимок данных при создании или перезапуске коннектора. | Строка | |
| Snapshot Locking Mode | snapshotLockingMode | Задает способ удержания блокировок таблиц во время выполнения снимка схемы. Возможные значения:
| none | Строка |
| Snapshot Locking Mode Custom Name | snapshotLockingModeCustomName | Имя пользовательской реализации (реализует интерфейс SnapshotterLocking), определяющей способ блокировки таблиц во время снимка схемы (используется при snapshotLockingMode=custom). | Строка | |
| Snapshot Lock Timeout Ms | snapshotLockTimeoutMs | Максимальное время (в миллисекундах) ожидания блокировок таблиц в начале снимка. Если блокировки не получены за это время, снимок прерывается. | 10s | Строка |
| Snapshot Max Threads | snapshotMaxThreads | Максимальное количество потоков, используемых для выполнения снимка данных. | 1 | Целое число |
| Snapshot Mode | snapshotMode | Определяет условия запуска снимка при старте коннектора. Возможные значения:
| initial | Строка |
| Snapshot Mode Configuration Based Snapshot Data | snapshotModeConfigurationBasedSnapshotData | Указывает, нужно ли снимать данные при режиме configuration_based. | False | Логическое значение |
| Snapshot Mode Configuration Based Snapshot On Data Error | snapshotModeConfigurationBasedSnapshotOnDataError | Указывает, нужно ли снимать данные в случае ошибки при режиме configuration_based. | False | Логическое значение |
| Snapshot Mode Configuration Based Snapshot On Schema Error | snapshotModeConfigurationBasedSnapshotOnSchemaError | Указывает, нужно ли снимать схему в случае ошибки при режиме configuration_based. | False | Логическое значение |
| Snapshot Mode Configuration Based Snapshot Schema | snapshotModeConfigurationBasedSnapshotSchema | Указывает, нужно ли снимать схему при режиме configuration_based. | False | Логическое значение |
| Snapshot Mode Configuration Based Start Stream | snapshotModeConfigurationBasedStartStream | Указывает, нужно ли запускать поток изменений после снимка при режиме configuration_based. | False | Логическое значение |
| Snapshot Mode Custom Name | snapshotModeCustomName | Имя пользовательской реализации (реализует интерфейс Snapshotter), определяющей логику снимка (используется при snapshotMode=custom). | Строка | |
| Snapshot Query Mode | snapshotQueryMode | Определяет тип запроса, используемого во время снимка. | select_all | Строка |
| Snapshot Query Mode Custom Name | snapshotQueryModeCustomName | Имя пользовательской реализации (реализует интерфейс SnapshotterQuery), определяющей построение запросов во время снимка (используется при snapshotQueryMode=custom). | Строка | |
| Snapshot Select Statement Overrides | snapshotSelectStatementOverrides | Список таблиц (через запятую) с переопределенными SELECT‑запросами для снимка. Позволяет задать кастомные запросы для отдельных таблиц. Формат: DB_NAME.TABLE_NAME или SCHEMA_NAME.TABLE_NAME. | Строка | |
| Snapshot Tables Order By Row Count | snapshotTablesOrderByRowCount | Определяет порядок обработки таблиц в начальном снимке по количеству строк. Возможные значения:
| disabled | Строка |
| Sourceinfo Struct Maker | sourceinfoStructMaker | Имя класса SourceInfoStructMaker, который возвращает схему и структуру SourceInfo для событий репликации. | io.debezium.connector.postgresql.PostgresSourceInfoStructMaker | Строка |
| Status Update Interval Ms | statusUpdateIntervalMs | Частота отправки обновлений статуса соединения репликации на сервер (в миллисекундах). Позволяет отслеживать состояние подключения. | 10s | Строка |
| Streaming Delay Ms | streamingDelayMs | Задержка (в миллисекундах) между завершением снимка данных и началом потоковой передачи изменений. Позволяет выполнить дополнительные подготовительные действия. | 0ms | Строка |
| Table Exclude List | tableExcludeList | Список регулярных выражений (через запятую), сопоставляемых с полными именами таблиц, которые следует исключить из мониторинга. Позволяет фильтровать таблицы по шаблонам. | Строка | |
| Table Ignore Builtin | tableIgnoreBuiltin | Флаг, указывающий, следует ли игнорировать встроенные (системные) таблицы PostgreSQL. | True | Логическое значение |
| Table Include List | tableIncludeList | Список таблиц, для которых должны захватываться изменения данных. Используется для точной настройки репликации — обрабатываются только указанные таблицы. | Строка | |
| Time Precision Mode | timePrecisionMode | Определяет способ представления временных типов данных (TIME, DATE, TIMESTAMP). Возможные значения:
| adaptive | Строка |
| Tombstones On Delete | tombstonesOnDelete | Указывает, следует ли представлять операции удаления как два события: событие удаления и последующее «надгробие» (tombstone). Если true, система может полностью удалить все события с данным ключом после удаления записи в источнике. | False | Логическое значение |
| Topic Naming Strategy | topicNamingStrategy | Имя класса TopicNamingStrategy, определяющего правила формирования имен топиков для разных типов событий (изменения данных, схемы, транзакции, heartbeat и т. д.). | io.debezium.schema.SchemaTopicNamingStrategy | Строка |
| Topic Prefix | topicPrefix | Обязательный параметр Префикс топиков, идентифицирующий сервер/кластер БД. Должен быть уникальным для всех коннекторов. Используется как основа для имен топиков. Допустимы: буквы, цифры, дефисы, точки, подчеркивания. | Строка | |
| Transaction Metadata Factory | transactionMetadataFactory | Класс, отвечающий за создание контекста транзакции и структур/схем транзакций. Определяет, как будут представлены транзакции в событиях репликации. | io.debezium.pipeline.txmetadata.DefaultTransactionMetadataFactory | Строка |
| Unavailable Value Placeholder | unavailableValuePlaceholder | Константа, указывающая, что исходное значение является TOAST‑значением, не предоставленным БД. Если начинается с hex:, оставшаяся часть строки интерпретируется как шестнадцатеричное представление октетов. | __debezium_unavailable_value | Строка |
| Xmin Fetch Interval Ms | xminFetchIntervalMs | Интервал (в миллисекундах), с которым значение xmin (нижняя граница для нового слота репликации) запрашивается из слота репликации. Меньшее значение повышает точность, но увеличивает нагрузку. Значение 0 отключает отслеживание xmin. | 0ms | Строка |
Расширенные параметры
| Название в UI | Название атрибута | Описание | Значение по умолчанию | Тип данных |
|---|---|---|---|---|
| Bridge Error Handler | bridgeErrorHandler | Определяет стратегию обработки ошибок на уровне интеграционного моста (bridge) в Bercut ESB. Отвечает за реакцию системы на сбои при передаче сообщений между компонентами. Может включать повторные попытки отправки, перенаправление в очередь ошибок, логирование и т. д. Функция доступна только для тех сторонних компонентов, которые позволяют системе получать уведомления о возникших исключениях. Некоторые компоненты обрабатывают ошибки внутри себя — в таких случаях использование bridgeErrorHandler невозможно. | False | Логическое значение |
| Exception Handler | exceptionHandler | Определяет стратегию обработки исключений на уровне компонента. По умолчанию используется стандартный обработчик org.apache.camel.spi.ExceptionHandler, который логирует ошибки на уровнях WARN или ERROR и игнорирует их. Позволяет настроить кастомную логику обработки (повторные попытки, перенаправление в очередь ошибок и т. д.). | Строка | |
| Exchange Pattern | exchangePattern | Задает шаблон взаимодействия между компонентами в интеграционном потоке. Определяет способ передачи сообщений и ожидания ответа. Возможные значения:
| Строка |